#!/usr/bin/env python3
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Splits a branch into smaller branches and uploads CLs."""

import collections
import dataclasses
import os
import re
import subprocess2
import sys
from typing import List, Set, Tuple, Dict, Any

import gclient_utils
import git_footers
import scm

import git_common as git

# If a call to `git cl split` will generate more than this number of CLs, the
# command will prompt the user to make sure they know what they're doing. Large
# numbers of CLs generated by `git cl split` have caused infrastructure issues
# in the past.
CL_SPLIT_FORCE_LIMIT = 10

# The maximum number of top reviewers to list. `git cl split` may send many CLs
# to a single reviewer, so the top reviewers with the most CLs sent to them
# will be listed.
CL_SPLIT_TOP_REVIEWERS = 5


def EmitWarning(msg: str):
    print("Warning: ", msg)


def HashList(lst: List[Any]) -> int:
    """
    Hash a list, returning a positive integer. Lists with identical elements
    should have the same hash, regardless of order.
    """
    # Python refuses to hash lists directly because they're mutable
    tup = tuple(sorted(lst))
    return abs(hash(tup))

FilesAndOwnersDirectory = collections.namedtuple("FilesAndOwnersDirectory",
                                                 "files owners_directories")


@dataclasses.dataclass
class CLInfo:
    """
    Data structure representing a single CL. The script will split the large CL
    into a list of these.

    Fields:
    - reviewers: the reviewers the CL will be sent to.
    - files: a list of <action>, <file> pairs in the CL.
             Has the same format as `git status`.
    - directories: a string representing the directories containing the files
                   in this CL. This is only used for replacing $directory in
                   the user-provided CL description.
    """
    # Have to use default_factory because lists are mutable
    reviewers: Set[str] = dataclasses.field(default_factory=set)
    files: List[Tuple[str, str]] = dataclasses.field(default_factory=list)

    # This is only used for formatting in the CL description, so it just
    # has to be convertible to string.
    directories: Any = ""

    def FormatForPrinting(self) -> str:
        """
        Format the CLInfo for printing to a file in a human-readable format.
        """
        # Don't quote the reviewer emails in the output
        reviewers_str = ", ".join(self.reviewers)
        lines = [
            f"Reviewers: [{reviewers_str}]", f"Directories: {self.directories}"
        ] + [f"{action}, {file}" for (action, file) in self.files]
        return "\n".join(lines)


def CLInfoFromFilesAndOwnersDirectoriesDict(
        d: Dict[Tuple[str], FilesAndOwnersDirectory]) -> List[CLInfo]:
    """
    Transform a dictionary mapping reviewer tuples to FilesAndOwnersDirectories
    into a list of CLInfo
    """
    cl_infos = []
    for (reviewers, fod) in d.items():
        cl_infos.append(
            CLInfo(set(reviewers), fod.files, fod.owners_directories))
    return cl_infos


def EnsureInGitRepository():
    """Throws an exception if the current directory is not a git repository."""
    git.run('rev-parse')


def CreateBranchName(prefix: str, files: List[Tuple[str, str]]) -> str:
    """
    Given a sub-CL as a list of (action, file) pairs, create a unique and
    deterministic branch name for it.
    The name has the format <prefix>_<dirname>_<hash(files)>_split.
    """
    file_names = [file for _, file in files]
    if len(file_names) == 1:
        # Only one file, just use its directory as the common path
        common_path = os.path.dirname(file_names[0])
    else:
        common_path = os.path.commonpath(file_names)
    if not common_path:
        # Files have nothing in common at all. Unlikely but possible.
        common_path = "None"
    # Replace path delimiter with underscore in common_path.
    common_path = common_path.replace(os.path.sep, '_')
    return f"{prefix}_{HashList(files):020}_{common_path}_split"


def CreateBranchForOneCL(prefix: str, files: List[Tuple[str, str]],
                         upstream: str) -> bool:
    """Creates a branch named |prefix| + "_" + |hash(files)| + "_split".

    Return false if the branch already exists. |upstream| is used as upstream
    for the created branch.
    """
    existing_branches = set(git.branches(use_limit=False))
    branch_name = CreateBranchName(prefix, files)
    if branch_name in existing_branches:
        return False
    git.run('checkout', '-t', upstream, '-b', branch_name)
    return True


def FormatDirectoriesForPrinting(directories, prefix=None):
    """Formats directory list for printing

    Uses dedicated format for single-item list."""

    prefixed = directories
    if prefix:
        prefixed = [(prefix + d) for d in directories]

    return str(prefixed) if len(prefixed) > 1 else str(prefixed[0])


def FormatDescriptionOrComment(txt, directories):
    """Replaces $directory with |directories| in |txt|."""
    to_insert = FormatDirectoriesForPrinting(directories)
    return txt.replace('$directory', to_insert)


def AddUploadedByGitClSplitToDescription(description):
    """Adds a 'This CL was uploaded by git cl split.' line to |description|.

    The line is added before footers, or at the end of |description| if it has
    no footers.
    """
    split_footers = git_footers.split_footers(description)
    lines = split_footers[0]
    if lines[-1] and not lines[-1].isspace():
        lines = lines + ['']
    lines = lines + ['This CL was uploaded by git cl split.']
    if split_footers[1]:
        lines += [''] + split_footers[1]
    return '\n'.join(lines)


def UploadCl(refactor_branch, refactor_branch_upstream, directories, files,
             description, comment, reviewers, changelist, cmd_upload,
             cq_dry_run, enable_auto_submit, topic, repository_root):
    """Uploads a CL with all changes to |files| in |refactor_branch|.

    Args:
        refactor_branch: Name of the branch that contains the changes to upload.
        refactor_branch_upstream: Name of the upstream of |refactor_branch|.
        directories: Paths to the directories that contain the OWNERS files for
            which to upload a CL.
        files: List of AffectedFile instances to include in the uploaded CL.
        description: Description of the uploaded CL.
        comment: Comment to post on the uploaded CL.
        reviewers: A set of reviewers for the CL.
        changelist: The Changelist class.
        cmd_upload: The function associated with the git cl upload command.
        cq_dry_run: If CL uploads should also do a cq dry run.
        enable_auto_submit: If CL uploads should also enable auto submit.
        topic: Topic to associate with uploaded CLs.
    """
    # Create a branch.
    if not CreateBranchForOneCL(refactor_branch, files,
                                refactor_branch_upstream):
        print('Skipping ' + FormatDirectoriesForPrinting(directories) +
              ' for which a branch already exists.')
        return

    # Checkout all changes to files in |files|.
    deleted_files = []
    modified_files = []
    for action, f in files:
        abspath = os.path.abspath(os.path.join(repository_root, f))
        if action == 'D':
            deleted_files.append(abspath)
        else:
            modified_files.append(abspath)

    if deleted_files:
        git.run(*['rm'] + deleted_files)
    if modified_files:
        git.run(*['checkout', refactor_branch, '--'] + modified_files)

    # Commit changes. The temporary file is created with delete=False so that it
    # can be deleted manually after git has read it rather than automatically
    # when it is closed.
    with gclient_utils.temporary_file() as tmp_file:
        gclient_utils.FileWrite(
            tmp_file, FormatDescriptionOrComment(description, directories))
        git.run('commit', '-F', tmp_file)

    # Upload a CL.
    upload_args = ['-f']
    if reviewers:
        upload_args.extend(['-r', ','.join(sorted(reviewers))])
    if cq_dry_run:
        upload_args.append('--cq-dry-run')
    if not comment:
        upload_args.append('--send-mail')
    if enable_auto_submit:
        upload_args.append('--enable-auto-submit')
    if topic:
        upload_args.append('--topic={}'.format(topic))
    print('Uploading CL for ' + FormatDirectoriesForPrinting(directories) +
          '...')

    ret = cmd_upload(upload_args)
    if ret != 0:
        print('Uploading failed.')
        print('Note: git cl split has built-in resume capabilities.')
        print('Delete ' + git.current_branch() +
              ' then run git cl split again to resume uploading.')

    if comment:
        changelist().AddComment(FormatDescriptionOrComment(
            comment, directories),
                                publish=True)


def GetFilesSplitByOwners(files, max_depth):
    """Returns a map of files split by OWNERS file.

    Returns:
        A map where keys are paths to directories containing an OWNERS file and
        values are lists of files sharing an OWNERS file.
    """
    files_split_by_owners = {}
    for action, path in files:
        # normpath() is important to normalize separators here, in prepration
        # for str.split() before. It would be nicer to use something like
        # pathlib here but alas...
        dir_with_owners = os.path.normpath(os.path.dirname(path))
        if max_depth >= 1:
            dir_with_owners = os.path.join(
                *dir_with_owners.split(os.path.sep)[:max_depth])
        # Find the closest parent directory with an OWNERS file.
        while (dir_with_owners not in files_split_by_owners
               and not os.path.isfile(os.path.join(dir_with_owners, 'OWNERS'))):
            dir_with_owners = os.path.dirname(dir_with_owners)
        files_split_by_owners.setdefault(dir_with_owners, []).append(
            (action, path))
    return files_split_by_owners


def PrintClInfo(cl_index, num_cls, directories, file_paths, description,
                reviewers, cq_dry_run, enable_auto_submit, topic):
    """Prints info about a CL.

    Args:
        cl_index: The index of this CL in the list of CLs to upload.
        num_cls: The total number of CLs that will be uploaded.
        directories: Paths to directories that contains the OWNERS files for
            which to upload a CL.
        file_paths: A list of files in this CL.
        description: The CL description.
        reviewers: A set of reviewers for this CL.
        cq_dry_run: If the CL should also be sent to CQ dry run.
        enable_auto_submit: If the CL should also have auto submit enabled.
        topic: Topic to set for this CL.
    """
    description_lines = FormatDescriptionOrComment(description,
                                                   directories).splitlines()
    indented_description = '\n'.join(['    ' + l for l in description_lines])

    print('CL {}/{}'.format(cl_index, num_cls))
    print('Paths: {}'.format(FormatDirectoriesForPrinting(directories)))
    print('Reviewers: {}'.format(', '.join(reviewers)))
    print('Auto-Submit: {}'.format(enable_auto_submit))
    print('CQ Dry Run: {}'.format(cq_dry_run))
    print('Topic: {}'.format(topic))
    print('\n' + indented_description + '\n')
    print('\n'.join(file_paths))
    print()


def LoadDescription(description_file, dry_run):
    if not description_file:
        if not dry_run:
            # Parser checks this as well, so should be impossible
            raise ValueError(
                "Must provide a description file except during dry runs")
        return ('Dummy description for dry run.\n'
                'directory = $directory')

    return gclient_utils.FileRead(description_file)


def PrintSummary(cl_infos, refactor_branch):
    """Print a brief summary of the splitting so the user
       can review it before uploading.

    Args:
       files_split_by_reviewers: A dictionary mapping reviewer tuples
           to the files and directories assigned to them.
    """
    for info in cl_infos:
        print(f'Reviewers: {info.reviewers}, files: {len(info.files)}, ',
              f'directories: {info.directories}')

    num_cls = len(cl_infos)
    print(f'\nWill split branch {refactor_branch} into {num_cls} CLs. '
          'Please quickly review them before proceeding.\n')

    if (num_cls > CL_SPLIT_FORCE_LIMIT):
        EmitWarning(
            'Uploading this many CLs may potentially '
            'reach the limit of concurrent runs, imposed on you by the '
            'build infrastructure. Your runs may be throttled as a '
            'result.\n\nPlease email infra-dev@chromium.org if you '
            'have any questions. '
            'The infra team reserves the right to cancel '
            'your jobs if they are overloading the CQ.\n\n'
            '(Alternatively, you can reduce the number of CLs created by '
            'using the --max-depth option. Pass --dry-run to examine the '
            'CLs which will be created until you are happy with the '
            'results.)')


def SplitCl(description_file, comment_file, changelist, cmd_upload, dry_run,
            cq_dry_run, enable_auto_submit, max_depth, topic, repository_root):
    """"Splits a branch into smaller branches and uploads CLs.

    Args:
        description_file: File containing the description of uploaded CLs.
        comment_file: File containing the comment of uploaded CLs.
        changelist: The Changelist class.
        cmd_upload: The function associated with the git cl upload command.
        dry_run: Whether this is a dry run (no branches or CLs created).
        cq_dry_run: If CL uploads should also do a cq dry run.
        enable_auto_submit: If CL uploads should also enable auto submit.
        max_depth: The maximum directory depth to search for OWNERS files. A
            value less than 1 means no limit.
        topic: Topic to associate with split CLs.

    Returns:
        0 in case of success. 1 in case of error.
    """

    description = LoadDescription(description_file, dry_run)
    description = AddUploadedByGitClSplitToDescription(description)

    comment = gclient_utils.FileRead(comment_file) if comment_file else None

    try:
        EnsureInGitRepository()

        cl = changelist()
        upstream = cl.GetCommonAncestorWithUpstream()
        files = [
            (action.strip(), f)
            for action, f in scm.GIT.CaptureStatus(repository_root, upstream)
        ]

        if not files:
            print('Cannot split an empty CL.')
            return 1

        author = git.run('config', 'user.email').strip() or None
        refactor_branch = git.current_branch()
        assert refactor_branch, "Can't run from detached branch."
        refactor_branch_upstream = git.upstream(refactor_branch)
        assert refactor_branch_upstream, \
            "Branch %s must have an upstream." % refactor_branch

        if not dry_run and not CheckDescriptionBugLink(description):
            return 0

        files_split_by_reviewers = SelectReviewersForFiles(
            cl, author, files, max_depth)
        cl_infos = CLInfoFromFilesAndOwnersDirectoriesDict(
            files_split_by_reviewers)

        if not dry_run:
            PrintSummary(cl_infos, refactor_branch)
            answer = gclient_utils.AskForData('Proceed? (y/N):')
            if answer.lower() != 'y':
                return 0

        cls_per_reviewer = collections.defaultdict(int)
        for cl_index, cl_info in enumerate(cl_infos, 1):
            # Convert reviewers from tuple to set.
            if dry_run:
                file_paths = [f for _, f in cl_info.files]
                PrintClInfo(cl_index, len(cl_infos), cl_info.directories,
                            file_paths, description, cl_info.reviewers,
                            cq_dry_run, enable_auto_submit, topic)
            else:
                UploadCl(refactor_branch, refactor_branch_upstream,
                         cl_info.directories, cl_info.files, description,
                         comment, cl_info.reviewers, changelist, cmd_upload,
                         cq_dry_run, enable_auto_submit, topic, repository_root)

            for reviewer in cl_info.reviewers:
                cls_per_reviewer[reviewer] += 1

        # List the top reviewers that will be sent the most CLs as a result of
        # the split.
        reviewer_rankings = sorted(cls_per_reviewer.items(),
                                   key=lambda item: item[1],
                                   reverse=True)
        print('The top reviewers are:')
        for reviewer, count in reviewer_rankings[:CL_SPLIT_TOP_REVIEWERS]:
            print(f'    {reviewer}: {count} CLs')

        # Go back to the original branch.
        git.run('checkout', refactor_branch)

    except subprocess2.CalledProcessError as cpe:
        sys.stderr.write(cpe.stderr)
        return 1
    return 0


def CheckDescriptionBugLink(description):
    """Verifies that the description contains a bug link.

    Examples:
        Bug: 123
        Bug: chromium:456

    Prompts user if the description does not contain a bug link.
    """
    bug_pattern = re.compile(r"^Bug:\s*(?:[a-zA-Z]+:)?[0-9]+", re.MULTILINE)
    matches = re.findall(bug_pattern, description)
    answer = 'y'
    if not matches:
        answer = gclient_utils.AskForData(
            'Description does not include a bug link. Proceed? (y/N):')
    return answer.lower() == 'y'


def SelectReviewersForFiles(cl, author, files, max_depth):
    """Selects reviewers for passed-in files

    Args:
        cl: Changelist class instance
        author: Email of person running 'git cl split'
        files: List of files
        max_depth: The maximum directory depth to search for OWNERS files.
            A value less than 1 means no limit.
    """
    info_split_by_owners = GetFilesSplitByOwners(files, max_depth)

    info_split_by_reviewers = {}

    for (directory, split_files) in info_split_by_owners.items():
        # Use '/' as a path separator in the branch name and the CL description
        # and comment.
        directory = directory.replace(os.path.sep, '/')
        file_paths = [f for _, f in split_files]
        # Convert reviewers list to tuple in order to use reviewers as key to
        # dictionary.
        reviewers = tuple(
            cl.owners_client.SuggestOwners(
                file_paths, exclude=[author, cl.owners_client.EVERYONE]))

        if not reviewers in info_split_by_reviewers:
            info_split_by_reviewers[reviewers] = FilesAndOwnersDirectory([], [])
        info_split_by_reviewers[reviewers].files.extend(split_files)
        info_split_by_reviewers[reviewers].owners_directories.append(directory)

    return info_split_by_reviewers
